package dbus.incrementssync;

import com.alibaba.otter.canal.protocol.FlatMessage;
import dbus.function.DbusProcessFunction;
import dbus.model.Flow;
import dbus.schema.FlatMessageSchema;
import dbus.sink.HbaseSyncSink;
import dbus.source.FlowSource;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

import java.util.Properties;

public class IncrementSyncApp {
    //定义广播变量
    public static final MapStateDescriptor<String, Flow> flowStateDescriptor =
            new MapStateDescriptor<      String, Flow>("flowBroadCastState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<Flow>() {
            }));

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.put("bootstrap.servers", "note01:9092,note02:9092,note03:9092");
        props.put("zookeeper.connect", "note01:2181,note02:2181,note03:2181");
        props.put("group.id", "group1");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");
        props.put("flink.partition-discovery.interval-millis", "30000");

        //消费kafka中的数据
        FlinkKafkaConsumer010<FlatMessage> consumer = new FlinkKafkaConsumer010<>("test", new FlatMessageSchema(), props);
        DataStreamSource<FlatMessage> message = sEnv.addSource(consumer);

        //保证数据库消息时续性,对于同库同表的数据进入一个分组的一个分区中
        KeyedStream<FlatMessage, String> keyedMessage = message.keyBy(new KeySelector<FlatMessage, String>() {
            @Override
            public String getKey(FlatMessage value) throws Exception {
                return value.getDatabase() + value.getTable();
            }
        });

        //读取配置流
        BroadcastStream<Flow> broadcast = sEnv.addSource(new FlowSource()).broadcast(flowStateDescriptor);

        //连接数据流以及配置流
        DataStream<Tuple2<FlatMessage,Flow>> connectStream = keyedMessage.connect(broadcast).process(new DbusProcessFunction()).setParallelism(1);



        connectStream.addSink(new HbaseSyncSink());

        sEnv.execute("IncrementSyncApp");

    }
}
